Vertex AI Pipeline で既に存在する Artifact を返す
from Vertex AI Pipelines
過去に作成した Artifact をまた使いたい
再利用できると同じ ML Metadata から Pipeline やその他もろもろを辿れる
今のところあまり良くはないがこうする
Importer に任せる
metadata 変えたくなったら別の Artifact になってしまうのを諦める
特に工夫しない、自分で Vertex ML Metadata 参照しても事態が好転しなかった......
やりたいこと
データ前処理パイプラインコンポーネントに、欲しいデータを要求する
env: str, target_id: str 的な、属性値で要求する、環境 hoge の id fuga のやつくれ
resolve コンポーネント
env, target_id から gs://... を解決する(定まるようにしておく)
ファイルがなければ初期化フローを実行して配置する
metadata に設定される値を一通り返す
impoter コンポーネント
dsl.importer(..., reimport=False) で resolve_task.output を参照する
metadata が一貫していれば同じ Vertex ML Metadata のものが返る
実装例
code:import_pipeline.py
from typing import NamedTuple
from kfp import dsl
@dsl.component(
base_image="python:3.11",
packages_to_install="google-cloud-storage",
)
def load_data_if_needed(env: str, id: str, file: str) -> NamedTuple( # type: ignore
"Outputs",
uri=str,
origin_uri=str,
content_type=str,
):
# ここでの前処理は別の GCS バケットからデータをコピーするだけとする
target_uri = f"gs://my-current-project-bucket/imported-data/{env}/{id}/{file}"
origin_uri = f"gs://other-origin-project-bucket/...."
client = storage.Client()
t_bucket, t_name = target_uri.split("/", 3)2:
o_bucket, o_name = origin_uri.split("/", 3)2:
# target に無ければコピー
if not storage.Blob.from_string(target_uri, client).exists():
client.bucket(o_bucket).copy_blob(
client.bucket(o_bucket).blob(o_name),
client.bucket(t_bucket),
t_name,
)
blob_meta = client.bucket(t_bucket).get_blob(t_name)
from collections import namedtuple
Outputs = namedtuple("Outputs", "uri", "origin_uri", "content_type")
return Outputs(
target_uri,
origin_uri,
blob_meta.content_type,
)
@dsl.pipeline(name="import-data")
def pipeline(env: str, id: str, file: str) -> dsl.Dataset:
load_task = load_data_if_needed(env=env, id=id, file=file)
importer = dsl.importer(
artifact_class=dsl.Dataset,
artifact_uri=load_task.outputs"uri",
metadata={
"origin_name": "other-origin-project",
"origin_uri": load_task.outputs"origin_uri",
"content_type": load_task.outputs"content_type",
},
reimport=False,
)
return importer.output
---.icon
以下は Import 周りの試行錯誤のログ
Vertex ML Metadata を参照しつつ
Impoter を使ってがんばる
自分で Artifact インスタンス作って返す
の2通りを検討
Importer を使う例
再利用というより既に外部にあるリソースを Artifact として参照する例
Special Case: Importer Components | Kubeflow
GoogleCloudPlatform/vertex-ai-samples@134a622 - notebooks/official/pipelines/lightweight_functions_component_io_kfp.ipynb
Define a pipeline that uses your components and the Importer のあたり
code:importer.py
# pipeline で import する
importer = kfp.dsl.importer(
artifact_uri="gs://ml-pipeline-playground/shakespeare1.txt",
artifact_class=dsl.Dataset,
reimport=False,
)
# importer.output で参照する
comp1_task = comp1(imported=importer.output)
import 時に Vertex ML Metadata に登録される
実行時にファイルが都度 gcs へコピーされたりはしない、pipeline の入出力の値部分だけ gcs に書かれる
reimport=False があれば既に ML Metadata に存在する場合作成されない
既に存在するかどうかのチェックは uri だけでなく schema_title や metadata 含めたものになる...と思う
別の component の返り値を artifact_uri に指定できる
metadata も指定できるが、DSL の都合上まとめてはできない...
code:importer_with_metadata.py
pretask = ...
# これは問題ない
importer = dsl.importer(
...,
metadata={"key1": pretask.outputs'key1', "key2": pretask.outputs'key2'},
)
# こういうのはだめ、outputs の各値は PipelineArtifactChannel だから
importer = dsl.importer(..., metadata=pretask.outputs'metadata')
importer = dsl.importer(..., metadata={**pretask.outputs'metadata'})
自分で ML Metadata 引いて Artifact を返す
Python 関数ベースのコンポーネントを使用する - Google Cloud Pipeline コンポーネントの使用  |  Vertex AI
Importer 使わないパターン
これはゴミができるか、都度新しい Artifact になってしまう
Vertex ML Metadata チェックして、あればそれを無ければ作る、Artifact にセットして返す
code:importer_artifact.py
...
import google.cloud.aiplatform as aip
try:
artifact = aip.Artifact.get_with_uri(target_gcs_uri)
except ValueError:
artifact = aip.Artifact.create(
schema_title="system.Artifact",
display_name="check_synced",
uri=target_gcs_uri,
metadata={"source_uri": origin_gcs_uri},
)
return dsl.Artifact(
name=artifact.resource_name,
uri=target_gcs_uri,
)
これは同じ Artifact が返るが、都度ゴミの Artifact が1個作られる
上は pipeline が実行段階で作ってた? もの、実行前にパイプライン作成段階で見えるやつ
下は自分で作ったもの
https://gyazo.com/44cc324dbe2f970f674a7bcb02bfead0
component が Artifact を返すと DAG から分かった段階で空の要素が作られてる
自分で Artifact を作るのではなく、component 引数の Output[Artifact] に値をセットする方式
ゴミは出ないが毎回新しい Artifact になってしまう、まあ DAG の段階で分かって id 払い出してるんだな
code:return_existing.py
def check_synced(
target_gcs_uri: str, origin_gcs_uri: str, file: dsl.Outputdsl.Dataset
):
...
import google.cloud.aiplatform as aip
try:
artifact = aip.Artifact.get_with_uri(target_gcs_uri)
except ValueError:
artifact = aip.Artifact.create(
schema_title="system.Artifact",
display_name="check_synced",
uri=target_gcs_uri,
metadata={"source_uri": origin_gcs_uri},
)
file.uri = artifact.uri
file.metadata = artifact.metadata
無ければ Artifact を作成 & 登録してそれを使いたい
あれば importer の返す値を使う、的なことをしたいのだが
Reuse artifact in different pipeline/run · Issue 7301 · kubeflow/pipelines
うーん
名前が importer.artifact になったり metadata 作りづらいけど importer のほうがいいかなあ
kubeflow/pipelines@master - sdk/python/kfp/dsl/importer_node.py#L65
PipelineArtifactChannel も対応してくれや
いやでも ParameterChannel を受ける artifact_uri に outputs 渡せて、metadata がだめなのへんな気がする
そしてこれは通るから、pipeline 側のコンパイル時に何か足りないのでは
code:koreha_ok.py
importer = dsl.importer(
artifact_class=dsl.Dataset,
artifact_uri=check_synced_task.outputs"uri",
metadata=check_synced_task.outputs, # これはいけるんだよね、json 文字列になるけど
reimport=False,
)
怒られてるのはここ
kubeflow/pipelines@b57f9e8 - sdk/python/kfp/compiler/pipeline_spec_builder.py#L585
metadata_protobuf_struct.update(task.importer_spec.metadata)
protobuf にできない? AttributeError: 'str' object has no attribute 'items'
動かない時 task.importer_spec.metadata は "{{$.inputs.parameters['metadata']}}"
動く時はこういうやつ {'foo': "{{$.inputs.parameters['metadata']}}"} で dict
ここは本来 dict が入ってるべき
task.importer_spec.artifact_uri はこういうやつ
{{channel:task=check-synced;name=uri;type=String;}}
これは PipelineParameterChannel の文字列表現か、runtime_parameter = uri している
metadata も runtime にできない?
うーん outputs['metadata'] のより詳細な型が分かって扱えないとだめか?
task.importer_spec.metadata が dict のときだけ protobuf にすればいいのでは
素通ししてみるとコンパイルは通るし実行できるけど metadata の設定はされないね
https://gyazo.com/18daedcbe1874a8a666fc7afbe648320
DAG 上では謎なことが起きている、Importer の出力と、下の transcribe に接続する2つの出力は同じ Artifact なのだが...